Narrow Transformation

map():The map function iterates over every line in RDD and split into new RDD. Using map() transformation we take in any function, and that function is applied to every element of RDD. In the map, we have the flexibility that the input and the return type of RDD may differ from each other. For example, we can have input RDD type as String, after applying the map() function the return RDD can be Boolean.

val rdd=sc.parallelize (Array(1, 2, 3, 4, 5))
val a=rdd.map(x=>x+2).collect 



Basic map example in scala
val x = sc.parallelize(List("spark", "rdd", "example",  "sample", "example"), 3)
val y = x.map(x => (x, 1))     
y.collect

val x = sc.parallelize(List("spark", "rdd", "example",  "sample", "example"), 3)
val y = x.map((_,1))
y.collect

val x = sc.parallelize(List("spark", "rdd", "example",  "sample", "example"), 3)
val y = x.map(x=> (x,x.length))
y.collect



val x = sc.parallelize(List("spark", "rdd", "example",  "sample", "example"), 3)
val y = x.map(x=> x.toUpperCase())
y.collect


val x = sc.parallelize(List("spark", "rdd", "example",  "sample", "example"), 3)
val y = x.map(x=> x.toLowerCase())
y.collect  



Basic map example in scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object  mapTest{
def main(args: Array[String]) = {
val spark = SparkSession.builder.appName("mapExample").master("local").getOrCreate()
val data = sc.textFile("spark_test.txt")
val mapFile = data.map(line => (line,line.length))
mapFile.foreach(println)
}
}

faltMap(): With the help of flatMap() function, to each input element, we have many elements in an output RDD. The most simple use of flatMap() is to split each input string into words. Map and flatMap are similar in the way that they take a line from input RDD and apply a function on that line. 
  • The key difference between map() and flatMap() is map() returns only one element, while flatMap() can return a list of elements(0 or more elements).Also, the output of the flatMap is flattened.
val data = sc.textFile("/FileStore/tables/data.txt")
val flatmapFile = data.flatMap(lines => lines.split(" "))
flatmapFile.collect


val words = sc.textFile("/FileStore/tables/data.txt")
val word1=words.flatMap(x=>x.split(" "))
val word2=word1.map(x => (x,1))
val word3=word2.reduceByKey(_+_)
word1.collect

val words = sc.textFile("/FileStore/tables/data.txt")
val word1=words.flatMap(x=>x.split(" "))
val word2=word1.map(x => (x,1))
val word3=word2.reduceByKey(_+_)
word2.collect.foreach(println)

val words = sc.textFile("/FileStore/tables/data.txt")
val word1=words.flatMap(x=>x.split(" "))
val word2=word1.map(x => (x,1))
val word3=word2.reduceByKey(_+_)
word3.collect.foreach(println)

val words = sc.textFile("/FileStore/tables/data.txt")
val word1=words.flatMap(x=>x.split(" "))
val word2=word1.map(x => (x,1))
val word3=word2.reduceByKey(_+_)    
word2.collect.foreach(println)


Basic map example in scala 
sc.parallelize(1 to 9, 3).flatMap(x=>List(x,x,x)).collect

Basic map example in scala
val rdd = sc.parallelize(Seq("Where is Mount Everest","Himalayas India"))
rdd.collect
rdd.map(x => x.split(" ")).collect
rdd.flatMap(x => x.split(" ")).collect
rdd.map(x => x.split(" ")).count()
rdd.flatMap(x=>x.split(" ")).map(x=>(x, x.length)).collect

Filter: Spark RDD filter() function returns a new RDD, containing only the elements that meet a predicate. It is a narrow operation because it does not shuffle data from one partition to many partitions.
  • filter is a transformation operation in Spark hence it is lazily evaluated
  • It is a narrow operation as it is not shuffling data from one partition to multiple partitions
  • filter accepts predicate as an argument and will filter the elements from source RDD which are not satisfied by predicate function
For example, Suppose RDD contains first five natural numbers (1, 2, 3, 4, and 5) and the predicate is check for an even number. The resulting RDD after the filter will contain only the even numbers i.e., 2 and 4.

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 2)
val x1=x.filter(e => e > 4).collect

val num = List(1, 2, 3, 4, 5)
val rdd = num.filter( x => x != 5)
Write in diffrent format
val num = List(1, 2, 3, 4, 5)
val rdd = num.filter(_ != 5)

In a very huge text file you want to just check if a particular keyword exists.
val line = spark.read.textFile("/FileStore/tables/data.txt")
val word = line.flatMap(_.split(","))
val output = word.filter { x => x.startsWith("S") }    // filter the words starts with letter“s”
output.collect.foreach(println)


val output1=line.filter(x=>x.contains("Spark"))
output1.collect.foreach(println)

val rdd = sc.parallelize(Seq("Where is Mount Everest","Himalayas India"))
rdd.filter(x=>x.contains("Himalayas")).collect


Filter out even number
sc.parallelize(1 to 15).filter(x=>(x%2==0)).collect


Filter out odd number
sc.parallelize(1 to 15).filter(x=>(x%2!=0)).collect

sc.parallelize(1 to 15).filter(_%5==0).collect
sc.parallelize(1 to 9, 3).flatMap(x=>List(x,x,x)).filter(x=> x%2 ==0 ).collect

mapPartition
Similar to map() transformation but in this case function runs separately on each partition (block) of RDD unlike map() where it was running on each element of partition. Hence mapPartitions are also useful when you are looking for performance gain (calls your function once/partition not once/element). Suppose you have elements from 1 to 100 distributed among 10 partitions i.e. 10 elements/partition. map() transformation will call func 100 times to process these 100 elements but in case of mapPartitions(), func will be called once/partition i.e. 10 times. Secondly, mapPartitions() holds the data in-memory i.e. it will store the result in memory until all the elements of the partition has been processed. mapPartitions() will return the result only after it finishes processing of whole partition.mapPartitions() requires an iterator input unlike map() transformation.

sc.parallelize(1 to 9, 3).map(x=>(x, "Hello")).collect
sc.parallelize(1 to 9, 3).partitions.size

sc.parallelize(1 to 9, 3).mapPartitions(x=>(Array("Hello").iterator)).collect
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next).iterator)).collect


In first example, I have applied map() transformation on dataset distributed between 3 partitions so that you can see function is called 9 times. In second example, when we applied mapPartitions(), you will notice it ran 3 times i.e. for each partition once. We had to convert string "Hello" into iterator because mapPartitions() takes iterator as input. In thirds step, I tried to get the iterator next value to show you the element. Note that next is always increasing value, so you can't step back.


sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next,x.next, "|").iterator)).collect


In first call next value for partition 1 changed from 1 => 2 , for partition 2 it changed from 4 => 5 and similarly for partition 3 it changed from 7 => 8. You can keep this increasing until hasNext is False (hasNext is a property of iteration which tells you whether collection has ended or not, it returns you True or False based on items left in the collection). For example,

sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.hasNext).iterator)).collect


You can see hasNext is true because there are elements left in each partition. Now suppose we access all three elements from each partition, then hasNext will result false. For example,
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.next, x.next, x.hasNext).iterator)).collect


Just for our understanding, if you will try to access next 4th time, you will get error which is expected.
sc.parallelize(1 to 9, 3).mapPartitions(x=>(List(x.next, x.next, x.next, x.next,x.hasNext).iterator)).collect


Think, map() transformation as special case of mapPartitions() where you have just 1 element in each partition. Isn't it?
 

No comments:

Post a Comment